[Feature] Support TTL on ShortTermMemory#657
Conversation
|
Hi, @da-daken, thanks for your contribution. Due to the imminent code freeze for version 0.3, I will prioritize handling some must-have items for this release. This may delay my review of your two PRs, but I estimate that I will complete the reviews within a week at the latest. |
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for picking this up — a few cross-cutting questions before this lands:
-
Python mirror is missing.
python/flink_agents/api/core_options.pymirrors every entry in the JavaAgentExecutionOptionstoday (MAX_RETRIES,RETRY_WAIT_INTERVAL, the async flags, etc.). The three new short-term-memory TTL options are not added on the Python side, so a Python user can't reach them via the typed API — they'd have to fall back to raw string keys. Could we add the mirrors (and a matching test) in this PR? CLAUDE.md is explicit about Java↔Python parity for shared options. -
Are we comfortable exposing
StateTtlConfig.UpdateTypeandStateTtlConfig.StateVisibilitydirectly in the public API? Two of the new options take Flink internal enums as their value type, in a class underorg.apache.flink.agents.api.agents. That couples the public surface to Flink's state API forever, and makes the Python mirror painful (we'd need to redefine the enums on the Python side or fall back to strings). One alternative is to wrap them in our own enums (ShortTermMemoryTtlUpdate/ShortTermMemoryTtlVisibility) with one-to-one mappings — costs a few lines and a translation step, buys API independence. What do you think?
| } | ||
|
|
||
| @Test | ||
| void testTTLConfigurationApplied() throws Exception { |
There was a problem hiding this comment.
Two gaps in coverage worth closing while we're here:
- TTL disabled path. No test exercises the early-return in
maybeEnableShortTermMemoryTTL(TTL_MS = 0or unset). That branch is the default for every existing pipeline, so a regression that accidentally enables TTL with 0ms would be silent. A short scenario withttlMs = 0confirming the thirdevent1returnsEXISTINGregardless of sleep would cover it. - Default update-type / visibility. Both tests explicitly override the update-type to
OnCreateAndWrite. The option defaults (OnReadAndWrite/NeverReturnExpired) are never actually exercised, so a regression in how the defaults are wired would slip through. One scenario that sets onlyTTL_MSand asserts the same expiry behavior would catch that.
| /** | ||
| * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is positive, attaches Flink | ||
| * {@link StateTtlConfig} to the short-term memory {@link MapStateDescriptor}. Unset, null, or | ||
| * non-positive values disable TTL (Flink does not allow zero/negative TTL). |
There was a problem hiding this comment.
Nice — recording "Flink does not allow zero/negative TTL" at the workaround site spares future maintainers a trip through Flink source.
# Conflicts: # python/flink_agents/api/core_options.py
@weiqingy Thanks for your review. You're right, I submitted the new code. PTAL |
| new ConfigOption<>("rag.async", Boolean.class, true); | ||
|
|
||
| public static final ConfigOption<Long> SHORT_TERM_MEMORY_STATE_TTL_MS = | ||
| new ConfigOption<>("short-term-memory.state-ttl.ms", Long.class, 0L); |
There was a problem hiding this comment.
Nit: 0L doubles as the "TTL disabled" sentinel, but that contract only lives in OperatorStateManager.maybeEnableShortTermMemoryTTL. A one-line javadoc here — e.g. "Set to a positive value in milliseconds to enable TTL; 0 (the default) disables it" — would spare future readers a trip into the runtime. Same applies to the two enum options below: worth noting they're only consulted when TTL_MS > 0.
| } | ||
|
|
||
| @Test | ||
| void testTTLConfigurationNotApplied() throws Exception { |
There was a problem hiding this comment.
Nit: testTTLConfigurationNotApplied vs testTTLConfigurationApplied (line 109) is a bit misleading — TTL is configured in both; what differs is whether enough time elapsed for entries to expire (sleep_ms=0 vs 2000). Names like testValueStillVisibleBeforeTTLExpiry / testValueExpiresAfterTTL would express the actual assertion. Same suggestion for the Python mirror in short_term_memory_ttl_test.py:126,159.
|
LGTM — both prior concerns are addressed cleanly. Only two small readability nits inline. Thanks for the quick turnaround. |
xintongsong
left a comment
There was a problem hiding this comment.
Thanks @da-daken for working on this, and thanks @weiqingy for the review.
I also left a few comments. In addition to the inline comments, I'd suggest to re-organize the PR commits.
- We do not allow merge commits in PRs. Please use rebase for conflict resolving.
- Please separate unrelated hotfixes from the major changes.
- You may check the community guidelines for instructions and examples.
| private StateTtlConfig.UpdateType toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) { | ||
| switch (updateType) { | ||
| case ON_CREATE_AND_WRITE: | ||
| return StateTtlConfig.UpdateType.OnCreateAndWrite; | ||
| case ON_READ_AND_WRITE: | ||
| return StateTtlConfig.UpdateType.OnReadAndWrite; | ||
| default: | ||
| throw new IllegalArgumentException("Unsupported TTL update type: " + updateType); | ||
| } | ||
| } | ||
|
|
||
| private StateTtlConfig.StateVisibility toFlinkStateVisibility( | ||
| ShortTermMemoryTtlVisibility stateVisibility) { | ||
| switch (stateVisibility) { | ||
| case NEVER_RETURN_EXPIRED: | ||
| return StateTtlConfig.StateVisibility.NeverReturnExpired; | ||
| case RETURN_EXPIRED_IF_NOT_CLEANED_UP: | ||
| return StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp; | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| "Unsupported TTL state visibility: " + stateVisibility); | ||
| } | ||
| } |
There was a problem hiding this comment.
I suggest placing these two methods in UpdateType and StateVisibility, respectively. They are only related to the two enum types themselves, not to where they are used.
There was a problem hiding this comment.
hi @xintongsong ,I’d prefer not to put the mapping methods on the public enum types.
Are we comfortable exposing StateTtlConfig.UpdateType and StateTtlConfig.StateVisibility directly in the public API? Two of the new options take Flink internal enums as their value type, in a class under org.apache.flink.agents.api.agents. That couples the public surface to Flink's state API forever, and makes the Python mirror painful (we'd need to redefine the enums on the Python side or fall back to strings).
As mentioned in the earlier review from @weiqingy , exposing Flink’s StateTtlConfig.UpdateType /
StateTtlConfig.StateVisibility directly would couple the Agents public API to Flink state APIs and
make the Python mirror harder. If ShortTermMemoryTtlUpdate or ShortTermMemoryTtlVisibility exposes
toFlink...() methods, we effectively reintroduce that coupling through the enum API.
I agree the mapping does not have to stay in OperatorStateManager.A good compromise would be to move it into a runtime-internal helper, for example:
final class ShortTermMemoryTtlConfigMapper {
static StateTtlConfig.UpdateType toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
...
}
static StateTtlConfig.StateVisibility toFlinkStateVisibility(
ShortTermMemoryTtlVisibility visibility) {
...
}
}There was a problem hiding this comment.
Makes sense to me. Thanks for the clarification.
| * public static ResourceDesc openAIResponses() { | ||
| * return ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName()) | ||
| * .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY")) | ||
| * .addInitialArgument("api_base_url", System.getenv("OPENAI_API_URL")) |
There was a problem hiding this comment.
This change is unrelated to TTL. It should be placed in a separate commit. Usually, in such cases, we add [hotfix] commits for fixing existing issues before commits of the actual PR changes.
| */ | ||
| private void maybeEnableShortTermMemoryTTL( | ||
| MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor, | ||
| AgentPlan agentPlan) { |
There was a problem hiding this comment.
It's unnecessary to take the whole AgentPlan as an argument. AgentConfiguration should be enough.
|
|
||
| @Test | ||
| void testTTLConfigurationDisabledWithZeroTtl() throws Exception { | ||
| List<String> results = runScenario(0L, 2000L, true, true); |
There was a problem hiding this comment.
Sleeping for 2s is unnecessarily long. If we expect the state still exist after the sleep, sleep shorter (e.g., 50ms). If we expect the state does not exist, we can set a small ttl (e.g., 50ms) and wait a bit longer (e.g., 200ms). Either way, 2s is too long.
Linked issue: #580
Purpose of change
Support controlling TTL on Short Term memory.
Tests
ut
API
no api
Documentation
doc-neededdoc-not-neededdoc-included